/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.curator.framework.recipes.leader;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected
 * to a Zookeeper cluster. If a group of N thread/processes contends for leadership, one will
 * be assigned leader until it releases leadership at which time another one from the group will
 * be chosen.
 * </p>
 * <p>
 * Note that this class uses an underlying {@link InterProcessMutex} and as a result leader
 * election is "fair" - each user will become leader in the order originally requested
 * (from ZK's point of view).
 * </p>
 */
public class LeaderSelector implements Closeable {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final CuratorFramework client;
    private final LeaderSelectorListener listener;
    private final CloseableExecutorService executorService;
    private final InterProcessMutex mutex;
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    private final AtomicBoolean autoRequeue = new AtomicBoolean(false);

    // guarded by synchronization
    private Future<?> ourTask = null;
    private Thread ourThread = null;

    private volatile boolean hasLeadership;
    private volatile String id = "";

    @VisibleForTesting
    volatile CountDownLatch debugLeadershipLatch = null;

    volatile CountDownLatch debugLeadershipWaitLatch = null;

    private enum State {
        LATENT,
        STARTED,
        CLOSED
    }

    private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("LeaderSelector");

    /**
     * @param client     the client
     * @param leaderPath the path for this leadership group
     * @param listener   listener
     */
    public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener) {
        this(
                client,
                leaderPath,
                new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true),
                listener);
    }

    /**
     * @param client        the client
     * @param leaderPath    the path for this leadership group
     * @param threadFactory factory to use for making internal threads
     * @param executor      the executor to run in
     * @param listener      listener
     * @deprecated This constructor was poorly thought out. Custom executor is useless. Use this version instead: {@link #LeaderSelector(CuratorFramework, String, ExecutorService, LeaderSelectorListener)}
     */
    @SuppressWarnings("UnusedParameters")
    @Deprecated
    public LeaderSelector(
            CuratorFramework client,
            String leaderPath,
            ThreadFactory threadFactory,
            Executor executor,
            LeaderSelectorListener listener) {
        this(client, leaderPath, new CloseableExecutorService(wrapExecutor(executor), true), listener);
    }

    /**
     * @param client          the client
     * @param leaderPath      the path for this leadership group
     * @param executorService thread pool to use
     * @param listener        listener
     */
    public LeaderSelector(
            CuratorFramework client,
            String leaderPath,
            ExecutorService executorService,
            LeaderSelectorListener listener) {
        this(client, leaderPath, new CloseableExecutorService(executorService), listener);
    }

    /**
     * @param client          the client
     * @param leaderPath      the path for this leadership group
     * @param executorService thread pool to use
     * @param listener        listener
     */
    public LeaderSelector(
            CuratorFramework client,
            String leaderPath,
            CloseableExecutorService executorService,
            LeaderSelectorListener listener) {
        Preconditions.checkNotNull(client, "client cannot be null");
        PathUtils.validatePath(leaderPath);
        Preconditions.checkNotNull(listener, "listener cannot be null");

        this.client = client;
        this.listener = new WrappedListener(this, listener);
        hasLeadership = false;

        this.executorService = executorService;
        mutex = new InterProcessMutex(client, leaderPath) {
            @Override
            protected byte[] getLockNodeBytes() {
                return (id.length() > 0) ? getIdBytes(id) : null;
            }
        };
    }

    static byte[] getIdBytes(String id) {
        try {
            return id.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new Error(e); // this should never happen
        }
    }

    /**
     * By default, when {@link LeaderSelectorListener#takeLeadership(CuratorFramework)} returns, this
     * instance is not requeued. Calling this method puts the leader selector into a mode where it
     * will always requeue itself.
     */
    public void autoRequeue() {
        autoRequeue.set(true);
    }

    /**
     * Sets the ID to store for this leader. Will be the value returned
     * when {@link #getParticipants()} is called. IMPORTANT: must be called
     * prior to {@link #start()} to have effect.
     *
     * @param id ID
     */
    public void setId(String id) {
        Preconditions.checkNotNull(id, "id cannot be null");

        this.id = id;
    }

    /**
     * Return the ID that was set via {@link #setId(String)}
     *
     * @return id
     */
    public String getId() {
        return id;
    }

    /**
     * Attempt leadership. This attempt is done in the background - i.e. this method returns
     * immediately.<br><br>
     * <b>IMPORTANT: </b> previous versions allowed this method to be called multiple times. This
     * is no longer supported. Use {@link #requeue()} for this purpose.
     */
    public void start() {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        Preconditions.checkState(!executorService.isShutdown(), "Already started");
        Preconditions.checkState(!hasLeadership, "Already has leadership");

        client.getConnectionStateListenable().addListener(listener);
        requeue();
    }

    /**
     * Re-queue an attempt for leadership. If this instance is already queued, nothing
     * happens and false is returned. If the instance was not queued, it is re-queued and true
     * is returned
     *
     * <p>The attempt will finish after session error, leadership release. This method is inherently
     * hard to use as there is no public API to guarantee successful requeue. Try {@link #autoRequeue()}
     * if you are in doubt.</p>
     *
     * @return true if re-queue is successful
     */
    public boolean requeue() {
        Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
        return internalRequeue();
    }

    private synchronized boolean internalRequeue() {
        if (ourTask == null && (state.get() == State.STARTED)) {
            ourTask = executorService.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    doWorkLoop();
                    return null;
                }
            });

            return true;
        }
        return false;
    }

    /**
     * Shutdown this selector and remove yourself from the leadership group
     */
    public synchronized void close() {
        Preconditions.checkState(
                state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");

        client.getConnectionStateListenable().removeListener(listener);
        executorService.close();
        ourTask = null;
    }

    /**
     * <p>
     * Returns the set of current participants in the leader selection
     * </p>
     * <p>
     * <p>
     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
     * return a value that does not match {@link #hasLeadership()} as hasLeadership
     * uses a local field of the class.
     * </p>
     *
     * @return participants
     * @throws Exception ZK errors, interruptions, etc.
     */
    public Collection<Participant> getParticipants() throws Exception {
        Collection<String> participantNodes = mutex.getParticipantNodes();

        return getParticipants(client, participantNodes);
    }

    static Collection<Participant> getParticipants(CuratorFramework client, Collection<String> participantNodes)
            throws Exception {
        ImmutableList.Builder<Participant> builder = ImmutableList.builder();

        boolean isLeader = true;
        for (String path : participantNodes) {
            Participant participant = participantForPath(client, path, isLeader);

            if (participant != null) {
                builder.add(participant);

                isLeader = false; // by definition the first node is the leader
            }
        }

        return builder.build();
    }

    /**
     * <p>
     * Return the id for the current leader. If for some reason there is no
     * current leader, a dummy participant is returned.
     * </p>
     * <p>
     * <p>
     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
     * return a value that does not match {@link #hasLeadership()} as hasLeadership
     * uses a local field of the class.
     * </p>
     *
     * @return leader
     * @throws Exception ZK errors, interruptions, etc.
     */
    public Participant getLeader() throws Exception {
        Collection<String> participantNodes = mutex.getParticipantNodes();
        return getLeader(client, participantNodes);
    }

    static Participant getLeader(CuratorFramework client, Collection<String> participantNodes) throws Exception {
        Participant result = null;

        if (participantNodes.size() > 0) {
            Iterator<String> iter = participantNodes.iterator();
            while (iter.hasNext()) {
                result = participantForPath(client, iter.next(), true);

                if (result != null) {
                    break;
                }
            }
        }

        if (result == null) {
            result = new Participant();
        }

        return result;
    }

    /**
     * Return true if leadership is currently held by this instance
     *
     * @return true/false
     */
    public boolean hasLeadership() {
        return hasLeadership;
    }

    private synchronized void taskStarted() {
        ourThread = Thread.currentThread();
    }

    private synchronized boolean taskDone() {
        ourTask = null;
        ourThread = null;
        // We are about to complete the very last steps in election task, there is
        // no synchronization point after this method. Safety:
        // * Next task will not run into election body before this method return, so no
        //   interference on hasLeadership.
        // * mutex is bound to current thread(e.g. not JVM), so leadership flag reset
        //   and leadership release, which is a time-consuming task, are not necessary
        //   to be atomic. Also, it is safe to run mutex.release() in parallel with
        //   mutex.acquire() from next election task.
        boolean leadership = hasLeadership;
        if (leadership) {
            hasLeadership = false;
        }
        if (autoRequeue.get()) {
            internalRequeue();
        }
        return leadership;
    }

    /**
     * Cancel ongoing election regardless of leadership.
     */
    private synchronized void cancelElection() {
        // Correctness with requeue:
        // * Cancel, taskStarted and taskDone are guarded by synchronized(this).
        // * If ourThread is null, new task will observe this cancellation after taskStarted.
        // * If ourThread is not null, old task will be cancelled and new task will observe
        //   this cancellation.
        if (ourThread != null) {
            ourThread.interrupt();
        }
    }

    /**
     * Attempt to cancel and interrupt the current leadership if this instance has leadership
     */
    public synchronized void interruptLeadership() {
        if (hasLeadership) {
            cancelElection();
        }
    }

    private static Participant participantForPath(CuratorFramework client, String path, boolean markAsLeader)
            throws Exception {
        try {
            byte[] bytes = client.getData().forPath(path);
            String thisId = new String(bytes, "UTF-8");
            return new Participant(thisId, markAsLeader);
        } catch (KeeperException.NoNodeException e) {
            return null;
        }
    }

    @VisibleForTesting
    volatile AtomicInteger failedMutexReleaseCount = null;

    /**
     * This method must not be called concurrently to obey guarantee to
     * {@link LeaderSelectorListener#takeLeadership(CuratorFramework)}.
     */
    @VisibleForTesting
    void doWork() throws Exception {
        taskStarted();
        hasLeadership = false;
        try {
            mutex.acquire();

            hasLeadership = true;
            try {
                if (debugLeadershipLatch != null) {
                    debugLeadershipLatch.countDown();
                }
                if (debugLeadershipWaitLatch != null) {
                    debugLeadershipWaitLatch.await();
                }
                listener.takeLeadership(client);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            } catch (Throwable e) {
                ThreadUtils.checkInterrupted(e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        } finally {
            if (taskDone()) {
                boolean wasInterrupted =
                        Thread.interrupted(); // clear any interrupted status so that mutex.release() works immediately
                try {
                    mutex.release();
                } catch (Exception e) {
                    if (failedMutexReleaseCount != null) {
                        failedMutexReleaseCount.incrementAndGet();
                    }

                    ThreadUtils.checkInterrupted(e);
                    log.error("The leader threw an exception", e);
                    // ignore errors - this is just a safety
                } finally {
                    if (wasInterrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    private void doWorkLoop() throws Exception {
        KeeperException exception = null;
        try {
            doWork();
        } catch (KeeperException.ConnectionLossException e) {
            exception = e;
        } catch (KeeperException.SessionExpiredException e) {
            exception = e;
        } catch (InterruptedException ignore) {
            Thread.currentThread().interrupt();
        }
        if ((exception != null)
                && !autoRequeue
                        .get()) // autoRequeue should ignore connection loss or session expired and just keep trying
        {
            throw exception;
        }
    }

    // temporary wrapper for deprecated constructor
    private static ExecutorService wrapExecutor(final Executor executor) {
        return new AbstractExecutorService() {
            private volatile boolean isShutdown = false;
            private volatile boolean isTerminated = false;

            @Override
            public void shutdown() {
                isShutdown = true;
            }

            @Override
            public List<Runnable> shutdownNow() {
                return Lists.newArrayList();
            }

            @Override
            public boolean isShutdown() {
                return isShutdown;
            }

            @Override
            public boolean isTerminated() {
                return isTerminated;
            }

            @Override
            public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
                throw new UnsupportedOperationException();
            }

            @Override
            public void execute(Runnable command) {
                try {
                    executor.execute(command);
                } finally {
                    isShutdown = true;
                    isTerminated = true;
                }
            }
        };
    }

    private static class WrappedListener implements LeaderSelectorListener {
        private final LeaderSelector leaderSelector;
        private final LeaderSelectorListener listener;

        public WrappedListener(LeaderSelector leaderSelector, LeaderSelectorListener listener) {
            this.leaderSelector = leaderSelector;
            this.listener = listener;
        }

        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            listener.takeLeadership(client);
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            try {
                listener.stateChanged(client, newState);
            } catch (CancelLeadershipException dummy) {
                // If we cancel only leadership but not whole election, then we could hand over
                // dated leadership to client with no further cancellation. Dated leadership is
                // possible due to separated steps in leadership acquire: server data(e.g. election sequence)
                // change and client flag(e.g. hasLeadership) set.
                leaderSelector.cancelElection();
            }
        }
    }
}
